package doris;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.BaseApp;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;

import java.util.Properties;

/**
 * @Author lzc
 * @Date 2023/6/3 14:11
 */
public class DorisStreamSink2 extends BaseApp {
    public static void main(String[] args) {
        new DorisStreamSink2().init(
            30001,
            2,
            "DorisDemo",
            "ods_log"
        );
    }
    
    @Override
    public void handle(StreamExecutionEnvironment env,
                       DataStreamSource<String> stream) {
        
        String[] fields = {"siteid", "citycode", "username", "pv"};
        DataType[] dataTypes = {
            DataTypes.INT(),
            DataTypes.SMALLINT(),
            DataTypes.STRING(),
            DataTypes.BIGINT()
        };
        Properties props = new Properties();
        props.setProperty("format", "json");
        props.setProperty("read_json_by_line", "true");
        SingleOutputStreamOperator<RowData> source = env
            .fromElements(
                "{\"siteid\": \"3000\", \"citycode\": \"1001\", \"username\": \"ww\",\"pv\": \"100\"}",
                "{\"siteid\": \"5000\", \"citycode\": \"1001\", \"username\": \"ww\",\"pv\": \"100\"}",
                "{\"siteid\": \"6000\", \"citycode\": \"1001\", \"username\": \"ww\",\"pv\": \"100\"}"
            )
            .map(new MapFunction<String, RowData>() {
                @Override
                public RowData map(String json) throws Exception {
                    JSONObject obj = JSON.parseObject(json);
                    GenericRowData rowData = new GenericRowData(4);
                    rowData.setField(0, obj.getIntValue("siteid"));
                    rowData.setField(1, obj.getShortValue("citycode"));
                    rowData.setField(2, StringData.fromString(obj.getString("username")));
                    rowData.setField(3, obj.getLongValue("pv"));
                    return rowData;
                }
            });
    
        DorisSink<RowData> sink = DorisSink.<RowData>builder()
            .setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisOptions(DorisOptions.builder() // 设置 doris 的连接参数
                                 .setFenodes("hadoop162:7030")
                                 .setTableIdentifier("test.table1")
                                 .setUsername("root")
                                 .setPassword("aaaaaa")
                                 .build())
            .setDorisExecutionOptions(DorisExecutionOptions.builder() // 执行参数
                                          //.setLabelPrefix("doris-label")  // stream-load 导入的时候的 label 前缀
                                          .disable2PC() // 开启两阶段提交后,labelPrefix 需要全局唯一,为了测试方便禁用两阶段提交
                                          .setDeletable(false)
                                          .setBufferCount(3) // 批次条数: 默认 3
                                          .setBufferSize(8 * 1024) // 批次大小: 默认 1M
                                          .setCheckInterval(3000) // 批次输出间隔   三个对批次的限制是或的关系
                                          .setMaxRetries(3)
                                          .setStreamLoadProp(props)
                                          .build())
            .setSerializer(RowDataSerializer.builder()
                               .setType(LoadConstants.JSON)
                               .setFieldNames(fields)
                               .setFieldType(dataTypes)
                               .build())
            .build();
        source.sinkTo(sink);
        
    }
}
